WorkflowsからFirestoreコネクタを用いてドキュメントを書き込み&取得してみる
概要
データ事業本部の根本です。
どうしてもWorkflowsからFirestoreを扱いたくて扱いたくて、コネクタ用いてを動かしてみました。
今回やりたいことはWorkflowsからコネクタを用いてFirestoreに書き込み、書き込んだデータを読み取る
です。
Firestoreはご存知の通りNoSQLのデータベースです。
無償枠があり(default)
データベースは以下の割り当てを無料で使用できるので気軽に試すことができます。
無料枠 | 割り当て |
---|---|
保存データ | 1 GiB |
ドキュメントの読み取り | 50,000/日 |
ドキュメントの書き込み | 20,000/日 |
ドキュメントの削除 | 20,000/日 |
送信データ転送 | 10 GiB/月 |
ただし以下は有料となるのでご注意ください。
次のオペレーションと機能には、無料の使用量は含まれていません。次の機能を使用するには、課金を有効にする必要があります。
- 名前付き(デフォルト以外の)データベースの使用
- TTL の削除
- PITR データ
- データのバックアップ
- 復元オペレーション
やってみる
今回実装する検証用ワークフローは以下のフローとなります。
- 変数を初期化
- Firestoreに書き込み
- Firestoreから読み取り
- 読み取り結果確認
イメージは以下の通りです。
それでは進めていきましょう。
Firestoreの準備
1つもデータベースを作成していないのであれば、(default)
データベースを作成します。データベースを作成
を押下します。
モードはネイティブモードを選択します。
データベースにDatastoreとの下位互換性を持たせるならDatastoreモードを選択するとのことですが、今回は必要ありません。
データベースID
には(default)
を指定します。初めて作成するデータベースであればデフォルトで入力されているはずです。
リージョンを東京リージョン(asia-northeast1)に設定してその他の値はデフォルトでデータベースを作成します。
問題なく作成が完了すると、データベース(default)に対して設定できる状態となります。
コレクションを開始
を押下します。
コレクションを作成します。コレクションID
に適当なIDを入力して保存します。わたしはworkflows_collection
と名づけました。
コレクションが作成されるとともにランダムなIDが設定されたドキュメントも作成されます。存在していても特に問題ないのでそのままにします。
これでFirestoreの準備は終わりです。続いてWorkflows側の準備に移ります。
Workflowsの準備
サンプルがあったので読み込みます。
読み取り:
書き込み:
上記のサンプルから、読み取り・書き込みに使うコネクタのAPIは以下の2つであるとわかります。
API | 用途 |
---|---|
googleapis.firestore.v1.projects.databases.documents.patch | 書き込み |
googleapis.firestore.v1.projects.databases.documents.get | 読み取り |
※サンプルからと書いてますが、読み取りに関してはリファレンスから探してきました。というのも読み取りのサンプルではコネクタと書いていますがHTTPリクエストを介して呼び出される呼び出し方になっているように見えます。
call: http.get
args:
url: ${"https://firestore.googleapis.com/v1/projects/"+project+"/databases/(default)/documents/"+collection+"/"+document}
auth:
type: OAuth2
上記の形式はコネクタ呼び出しの形式ではなく通常のHTTPリクエストを介しての呼び出しとなると思われます。
注: HTTP リクエストによる Google Cloud サービスの呼び出しと、Workflowsのコネクタを使用した他の Google Cloud プロダクトとワークフローを統合する API オペレーションの実行を混同しないでください。
今回はコネクタでの呼び出しをしたかったためリファレンスからコネクタを探してきました。
さて、以下にリファレンスを元にWorkflowsのワークフローを作成しました。
とりあえずプログラム全文です。
- initialize:
assign:
- project: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- workflow_exec_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
- collection: "workflows_collection"
- values_to_write:
execution_id:
stringValue: ${workflow_exec_id}
target_file_name:
stringValue: "test_file_1"
file_no:
integerValue: 1
- write_to_firestore:
call: googleapis.firestore.v1.projects.databases.documents.patch
args:
name: ${"projects/"+project+"/databases/(default)/documents/"+collection+"/"+workflow_exec_id}
body:
fields: ${values_to_write}
result: write_result
- printWriteResultStep:
call: sys.log
args:
text: ${write_result.name}
- read_item:
try:
call: googleapis.firestore.v1.projects.databases.documents.get
args:
name: ${write_result.name}
result: read_result
except:
as: e
steps:
- is_the_key_found:
switch:
- condition: ${e.code == 404}
next: document_not_found
- condition: ${e.code == 403}
next: auth_error
next: logStep
- document_not_found:
return: "Document not found."
- auth_error:
return: "Authentication error."
- logStep:
call: sys.log
args:
#text: ${document_value.body.fields.execution_id.stringValue}
text: ${read_result}
ステップごとに簡単に解説します。
-
initialize
- 環境変数から以下の情報を取得し、変数に割り当てます。
project
: 組み込み環境変数GOOGLE_CLOUD_PROJECT_ID
を用いてGoogle CloudプロジェクトIDを取得しますworkflow_exec_id
: 組み込み環境変数GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID
を用いてワークフローの実行IDを取得します
- Firestoreのコレクション名と書き込むデータの内容を設定します
collection
: "workflows_collection"という名前のコレクションに設定しますvalues_to_write
: 以下のデータを含むオブジェクトを設定します(テストのため適当に設定しました)execution_id
: ワークフロー実行IDtarget_file_name
: "test_file_1"という文字列file_no
: 整数値の1を保存します
- 環境変数から以下の情報を取得し、変数に割り当てます。
-
write_to_firestore
- Firestoreにデータを書き込むためにコネクタ
googleapis.firestore.v1.projects.databases.documents.patch
を呼び出します - 書き込み先のドキュメント名を生成し、指定されたデータをフィールド(
fields
)としてFirestoreに保存します- ドキュメント名は
projects/{project}/databases/(default)/documents/{collection}/{workflow_exec_id}
の形式です
- ドキュメント名は
- 結果は
write_result
に格納されます
- Firestoreにデータを書き込むためにコネクタ
-
printWriteResultStep
write_result
の名前をログに出力します
-
read_item
- Firestoreからデータを読み取るためにコネクタ
googleapis.firestore.v1.projects.databases.documents.get
を呼び出します - 読み取り結果は
read_result
に格納されます - エラーが発生した場合、エラーコードに応じて
document_not_found
またはauth_error
ステップに進みます
- Firestoreからデータを読み取るためにコネクタ
-
document_not_found
- ドキュメントが見つからなかった場合のエラーメッセージを返します
-
auth_error
- 認証エラーが発生した場合のエラーメッセージを返します
-
logStep
- 読み取った結果をログに出力します
実行してみる
Workflowsのコンソールから実行してみます。
実行に成功したら実行IDを確認してみてください。
私の場合だと実行IDは62032338-01af-4a00-b131-009a81cb4001
でした。
Firestoreを確認します。
作成したドキュメント内のコレクションにWorkflowsの実行ID62032338-01af-4a00-b131-009a81cb4001
を持ったドキュメントがいました。
問題なく書き込みができていることが確認できました。
読み取りができているかWorkflowsのログを確認します。
実行IDや検証用に設定したtarget_file_name
やfile_no
の値が出力されているのが確認できます。こちらも問題なくFirestoreから読み取れていました。
textPayload: "{"body":{"createTime":"2024-10-23T15:07:57.374227Z","fields":{"execution_id":{"stringValue":"62032338-01af-4a00-b131-009a81cb4001"},"file_no":{"integerValue":"1"},"target_file_name":{"stringValue":"test_file_1"}},"name":"projects/***/databases/(default)/documents/workflows_collection/62032338-01af-4a00-b131-009a81cb4001"
補足
Firestoreに書き込む際のデータ型はYAML(Workflows)ではstringValue
など指定した形式で表現する必要があります。
以下に詳細が記載されています。
所感
FirestoreにWorkflowsから書き込み、読み取りをしてみましたがコネクタが用意されているので実装は楽だなと感じました。
WorkflowsとFirestoreを組み合わせたらどんなことができるのかはあまりイメージが湧いていなかったりしますが(ポーリング、処理結果を書き込みとか?)、今後面白い使い方が思い浮かんだらそれも記事にしてみたいと思います。
それではまた。ナマステー
参考